package com.skyline.courier.net.provider.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.Attribute;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.skyline.courier.net.ClientMessageReceiver;
import com.skyline.courier.net.Connection;
import com.skyline.courier.net.ConnectionManager;
import com.skyline.courier.net.SendFuture;
import com.skyline.courier.net.TransferBean;

public class NettyClientInitializer extends NettyEndPointInitializer {
	private static final Logger LOGGER = LoggerFactory.getLogger(NettyClientInitializer.class);
	protected ConnectionManager connectionManager;
	
	public void setConnectionManager(ConnectionManager connectionManager) {
		this.connectionManager = connectionManager;
	}

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		super.initChannel(ch);
		
		ChannelPipeline pipeline = ch.pipeline();
		
		pipeline.addLast("handler", new ChannelDuplexHandler() {
			
			@Override
			public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
				SocketAddress address = ctx.channel().remoteAddress();
				if(msg instanceof TransferBean) {
					TransferBean transferBean = (TransferBean) msg;
					Object target = transferBean.getTarget();
					
					LOGGER.info("从[" + address + "]收到消息[" + target + "]");
					NettyClientInitializer.this.statistic.receivedMsg(address, System.currentTimeMillis());

					Attribute<Map<Long, SendFuture<?>>> futuresAttr = ctx.channel().attr(NettyConnection.FUTURES_ATTRIBUTE_KEY);
					(new ClientMessageReceiver()).onMessageReceive(transferBean, futuresAttr.get());
				} else {
					LOGGER.info("从[" + address + "]收到非TransferBean实现的消息[" + msg + "]");
				}
			}
			
			@Override
			public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
				if (cause instanceof IOException) {
					Channel channel = ctx.channel();
					SocketAddress address = channel.remoteAddress();
					
					NettyClientInitializer.LOGGER.info("链接发生异常，channel[" + channel + "]", cause);
					
					ctx.close();
					
					disconnectAddress(address);
				} else {
					super.exceptionCaught(ctx, cause);
				}
			}
			
			private void disconnectAddress(SocketAddress address) {
				if(connectionManager != null) {
					Connection connection = connectionManager.removeConnected(address);
					if(connection != null) {
						connectionManager.addDisconnectAddress(address, connection);
					}
				}
			}
			
		});
		
	}

	
}
